Apache Flink এর প্রোগ্রামিং মডেলটি স্ট্রিম-প্রসেসিং এর জন্য ডিজাইন করা হয়েছে এবং এটি ব্যাচ এবং রিয়েল-টাইম ডেটা প্রসেসিং উভয় ক্ষেত্রেই ব্যবহৃত হয়। Flink এর প্রোগ্রামিং মডেলটি মূলত একটি distributed stream processing framework যা উচ্চ পারফরম্যান্স এবং লো-লেটেন্সি প্রদান করে। এটি event-driven আর্কিটেকচারে কাজ করে, যেখানে প্রতিটি ইভেন্ট বা ডেটা রেকর্ডকে আলাদাভাবে প্রসেস করা হয়।
Flink এর DataStream API হল streaming data প্রসেস করার জন্য একটি শক্তিশালী ইন্টারফেস। এটি অবিরত প্রবাহিত ডেটার উপর অপারেশন সম্পাদন করতে সক্ষম এবং এটি low-latency এবং fault-tolerant প্রসেসিং সমর্থন করে। Flink এর স্ট্রিম প্রোগ্রামগুলি সাধারণত একটি source, একটি বা একাধিক transformation, এবং একটি sink নিয়ে গঠিত।
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Data source - reading from a socket stream
DataStream<String> text = env.socketTextStream("localhost", 9999);
// Transformation - split and count words
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.sum(1);
// Sink - print output
wordCounts.print();
env.execute("Word Count Example");
এই উদাহরণটি একটি সিম্পল Word Count প্রোগ্রাম, যা একটি সোকেট থেকে ডেটা পড়ে এবং প্রতিটি লাইনের শব্দ গুনে আউটপুট করে।
Apache Flink এর প্রোগ্রামিং মডেল খুবই শক্তিশালী এবং এটি বড় আকারের স্ট্রিম এবং ব্যাচ ডেটা প্রসেসিংয়ের জন্য উপযুক্ত।
Flink-এর Execution Environment এবং Context দুটি খুব গুরুত্বপূর্ণ উপাদান, যা Flink প্রোগ্রাম চালাতে এবং পরিচালনা করতে ব্যবহৃত হয়। নিচে এই দুটি উপাদানের ব্যাখ্যা দেওয়া হলো:
Execution Environment হলো Flink প্রোগ্রামের কেন্দ্রীয় বিন্দু, যা বিভিন্ন সেটআপ এবং সংযোগ তৈরি করে এবং প্রোগ্রামের পুরো লাইফসাইকেল পরিচালনা করে। এটি Flink API-এর মাধ্যমে কাজ করে এবং ডেটা প্রসেসিং টাস্ক শুরু করে।
Flink-এ সাধারণত তিন ধরনের Execution Environment রয়েছে:
Local Execution Environment:
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
Remote Execution Environment:
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("host", port, "path/to/jar");
Default Execution Environment:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Execution Context Flink-এর ভিতরে একটি প্রোগ্রাম চলার সময় তার বর্তমান স্টেট এবং সেটিংস পরিচালনা করে। এটি Execution Environment-এর সাথে কাজ করে। Execution Context এর মাধ্যমে Flink প্রোগ্রাম এর Configuration, State Management, এবং Task Execution পরিচালিত হয়।
Execution Context সাধারণত ডেভেলপারের জন্য সরাসরি ব্যবহারের প্রয়োজন হয় না। এটি Flink Runtime দ্বারা ব্যবহৃত হয় এবং এটি Execution Environment এর মাধ্যমে কাজ করে।
import org.apache.flink.api.java.ExecutionEnvironment;
public class FlinkExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// ডেটাসেট তৈরি করা
DataSet<String> text = env.readTextFile("path/to/file.txt");
// ডেটাসেটের উপর ট্রান্সফরমেশন প্রয়োগ করা
DataSet<Integer> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
// আউটপুট লিখে দেওয়া
wordCounts.writeAsCsv("output/path");
// Execution শুরু করা
env.execute("Flink Word Count Example");
}
}
এই উদাহরণে, ExecutionEnvironment.getExecutionEnvironment()
ব্যবহার করে Execution Environment তৈরি করা হয়েছে। তারপর এটি দিয়ে ডেটাসেট তৈরি, ট্রান্সফরমেশন অ্যাপ্লাই এবং আউটপুট প্রসেস করা হয়েছে।
এই দুইটি উপাদান Flink প্রোগ্রাম ডেভেলপ এবং এক্সিকিউশনের জন্য অত্যন্ত গুরুত্বপূর্ণ।
Apache Flink-এ Transformations হলো ডেটা প্রক্রিয়াকরণের বিভিন্ন অপারেশন যা স্ট্রিম বা ব্যাচ ডেটা প্রসেস করতে ব্যবহৃত হয়। Flink API বিভিন্ন ধরনের ট্রান্সফরমেশন অফার করে, যেমন Map
, Filter
, FlatMap
, KeyBy
, এবং Window
, যা ডেটা স্ট্রিমে প্রয়োজনীয় পরিবর্তন ও বিশ্লেষণ করতে ব্যবহৃত হয়।
Map
ফাংশন প্রতিটি ইনপুট রেকর্ডের উপর একটি অপারেশন প্রয়োগ করে এবং আউটপুট হিসেবে প্রতিটি ইনপুটের সাথে সম্পর্কিত একটি আউটপুট প্রদান করে।
কোড উদাহরণ:
DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataStream<Integer> squaredNumbers = numbers.map(value -> value * value);
বর্ণনা: এখানে, প্রতিটি সংখ্যা স্কোয়ার করা হচ্ছে, যেমন: ১ → ১, ২ → ৪, ইত্যাদি।
Filter
ট্রান্সফরমেশন একটি কন্ডিশন চেক করে এবং শুধুমাত্র সেই ইনপুট আইটেমগুলিকে আউটপুটে রাখে যা শর্তটি পূরণ করে।
কোড উদাহরণ:
DataStream<Integer> evenNumbers = numbers.filter(value -> value % 2 == 0);
বর্ণনা: উপরের উদাহরণে, শুধুমাত্র জোড় সংখ্যাগুলো ফিল্টার করা হচ্ছে।
FlatMap
একটি ট্রান্সফরমেশন যা প্রতিটি ইনপুট আইটেমের জন্য এক বা একাধিক আউটপুট প্রদান করতে পারে। এটি Map
এবং Filter
এর সংমিশ্রণ হিসেবে কাজ করে।
কোড উদাহরণ:
DataStream<String> sentences = env.fromElements("Apache Flink", "Big Data");
DataStream<String> words = sentences.flatMap((String sentence, Collector<String> out) -> {
for (String word : sentence.split(" ")) {
out.collect(word);
}
});
বর্ণনা: এখানে, প্রতিটি বাক্যকে শব্দে বিভক্ত করে পৃথক আউটপুট স্ট্রিম হিসেবে বের করা হচ্ছে।
KeyBy
ট্রান্সফরমেশন ইনপুট ডেটা স্ট্রিমকে একটি কী ফিল্ডের উপর ভিত্তি করে পার্টিশনে ভাগ করে। এটি গ্রুপিং বা অ্যাগ্রিগেশন অপারেশন করার জন্য ব্যবহৃত হয়।
কোড উদাহরণ:
DataStream<Tuple2<String, Integer>> items = env.fromElements(
new Tuple2<>("apple", 3),
new Tuple2<>("banana", 2),
new Tuple2<>("apple", 4)
);
KeyedStream<Tuple2<String, Integer>, String> keyedItems = items.keyBy(value -> value.f0);
বর্ণনা: এখানে, keyBy
অপারেশনের মাধ্যমে একই আইটেম (যেমন, "apple") একই গ্রুপে রাখা হচ্ছে।
Window
ট্রান্সফরমেশন ইনপুট স্ট্রিমকে উইন্ডোতে ভাগ করে দেয়, যা নির্দিষ্ট সময়কাল বা আইটেম সংখ্যা ভিত্তিক হতে পারে। এটি স্ট্রিম ডেটা প্রসেসিংয়ের জন্য খুব গুরুত্বপূর্ণ।
কোড উদাহরণ (Time Window):
DataStream<Tuple2<String, Integer>> windowedCounts = keyedItems
.timeWindow(Time.seconds(10))
.sum(1);
বর্ণনা: এখানে, ১০ সেকেন্ডের টাইম উইন্ডোতে প্রতিটি আইটেমের সংখ্যা যোগ করা হচ্ছে।
Apache Flink-এ ট্রান্সফরমেশন ব্যবহার করে ডেটা স্ট্রিম প্রসেসিং অনেক সহজ এবং কার্যকর হয়। এগুলো মূলত ডেটা ফিল্টার, গ্রুপিং, এবং উইন্ডো-ভিত্তিক প্রক্রিয়াকরণে সাহায্য করে।
Apache Flink এ Time Windows স্ট্রিম প্রসেসিংয়ের একটি গুরুত্বপূর্ণ অংশ, যা ডেটা স্ট্রিমকে নির্দিষ্ট সময়ের ইন্টারভালে বিভক্ত করে প্রসেস করতে সহায়তা করে। Flink এ তিনটি প্রধান ধরনের উইন্ডো রয়েছে: Tumbling Windows, Sliding Windows, এবং Session Windows। এদের প্রত্যেকটি ভিন্ন ধরণের ডেটা প্রসেসিং কেসের জন্য ব্যবহৃত হয়।
Tumbling Windows হল স্থির দৈর্ঘ্যের উইন্ডো যা ওভারল্যাপ ছাড়াই একটির পর একটি নির্দিষ্ট সময়ের পরপর তৈরি হয়। Tumbling উইন্ডো একটি নির্দিষ্ট সময় পরিসরে সমস্ত ডেটা সংগ্রহ করে এবং তারপর সেই উইন্ডো বন্ধ হয় ও প্রসেসিং হয়। নতুন উইন্ডো শুরু হওয়ার আগে পুরোনো উইন্ডো সম্পূর্ণরূপে বন্ধ হয়।
DataStream<Tuple2<String, Integer>> windowCounts = input
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1);
উপরের উদাহরণে, TumblingEventTimeWindows.of(Time.seconds(5))
প্রতি ৫ সেকেন্ডের জন্য একটি উইন্ডো তৈরি করে।
Sliding Windows Tumbling Windows এর মতই কাজ করে, তবে এটি স্থির দৈর্ঘ্যের উইন্ডো যা ওভারল্যাপ করে। Sliding Windows এর দুটি প্যারামিটার থাকে: উইন্ডোর দৈর্ঘ্য এবং উইন্ডোর স্লাইড ইন্টারভাল। উইন্ডোর স্লাইড ইন্টারভাল যদি উইন্ডোর দৈর্ঘ্যের চেয়ে ছোট হয়, তবে উইন্ডোগুলি একে অপরকে ওভারল্যাপ করে। এটি একটি ইভেন্ট একাধিক উইন্ডোতে অন্তর্ভুক্ত হতে দেয়।
DataStream<Tuple2<String, Integer>> windowCounts = input
.keyBy(value -> value.f0)
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10)))
.sum(1);
এই উদাহরণে, উইন্ডো প্রতি ১ মিনিটের জন্য তৈরি হয়, এবং প্রতি ১০ সেকেন্ডে স্লাইড করে। ফলে, একাধিক উইন্ডো একে অপরকে ওভারল্যাপ করবে।
Session Windows হল ডেটা স্ট্রিমে সেশন শনাক্ত করার জন্য ব্যবহৃত হয়। এটি স্ট্রিমের মধ্যে ইভেন্টগুলির মধ্যে inactivity gap বা নির্দিষ্ট সময়ের বিরতি (gap) এর উপর ভিত্তি করে উইন্ডো তৈরি করে। যদি একটি নির্দিষ্ট সময়ের মধ্যে কোনো ইভেন্ট না ঘটে, তবে উইন্ডোটি বন্ধ হয় এবং একটি নতুন সেশন শুরু হয়।
DataStream<Tuple2<String, Integer>> windowCounts = input
.keyBy(value -> value.f0)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1);
এই উদাহরণে, ProcessingTimeSessionWindows.withGap(Time.seconds(30))
একটি সেশন উইন্ডো তৈরি করে, যা তখনই বন্ধ হবে যদি ৩০ সেকেন্ডের মধ্যে কোনো নতুন ইভেন্ট না ঘটে।
Flink এর Time Windows ব্যবহার করে আপনি বিভিন্ন ধরণের ডেটা এনালাইসিস করতে পারেন, যেমন রিয়েল-টাইম এনালাইসিস, ব্যাচ প্রসেসিং, এবং সেশন ভিত্তিক এনালাইসিস।
Apache Flink-এ Watermark এবং Event Time Processing হলো স্ট্রিম প্রসেসিং-এর খুব গুরুত্বপূর্ণ কনসেপ্ট, বিশেষ করে real-time data processing এর ক্ষেত্রে। Flink এমন ডেটা স্ট্রিম নিয়ে কাজ করতে পারে যেগুলো event-driven এবং যেগুলোর events কোনো নির্দিষ্ট সময়ে ঘটে। নিচে এই দুইটি বিষয় বিস্তারিতভাবে ব্যাখ্যা করা হলো।
Event Time হলো সেই সময় যখন একটি ইভেন্ট আসলেই ঘটেছিল। Flink-এ event time নির্ভর করে স্ট্রিমে থাকা ইভেন্টগুলোর টাইমস্ট্যাম্পের উপর, যা মূলত সোর্স থেকেই আসতে পারে। Event time প্রসেসিং বিশেষ করে real-time analytics এবং latency-sensitive অ্যাপ্লিকেশনগুলোর ক্ষেত্রে খুবই কার্যকর, কারণ এটি স্ট্রিমে আসা ইভেন্টগুলোর আসল টাইমস্ট্যাম্পকে ভিত্তি করে প্রসেসিং করে, ইভেন্টগুলি স্ট্রিমে আসার টাইম বা প্রসেসিং টাইমের উপর নির্ভর করে না।
Watermark হলো একটি মেকানিজম যা Flink-কে স্ট্রিমের event time ট্র্যাক করতে সহায়তা করে। Watermark স্ট্রিমের মধ্য দিয়ে প্রবাহিত হয় এবং Flink-কে বলে যে নির্দিষ্ট টাইমস্ট্যাম্প পর্যন্ত ইভেন্টগুলি এসেছে কিনা। Watermark ইভেন্ট টাইম উইন্ডোগুলি প্রসেস করতে এবং লেট ইভেন্টগুলি সনাক্ত করতে ব্যবহৃত হয়।
WatermarkStrategy
API ব্যবহার করে তৈরি করা হয়।Flink এ event time এবং watermark ব্যবহার করার একটি উদাহরণ:
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
public class FlinkWatermarkExample {
public static void main(String[] args) throws Exception {
// Execution Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Event time ব্যবহার করা হবে বলে নির্ধারণ করা
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Data stream পড়া এবং Watermark Strategy নির্ধারণ করা
DataStream<Event> events = env
.readTextFile("path/to/events")
.map(new EventParser())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
// Event time এর উপর ভিত্তি করে উইন্ডো তৈরি করা
events
.keyBy(event -> event.getKey())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.trigger(EventTimeTrigger.create())
.process(new CustomWindowFunction())
.print();
env.execute("Event Time Processing Example");
}
}
StreamExecutionEnvironment
তৈরি করা হয়েছে এবং event time নির্ধারণ করা হয়েছে।assignTimestampsAndWatermarks
ব্যবহার করে আমরা একটি watermark strategy নির্ধারণ করেছি যা ইভেন্টগুলির টাইমস্ট্যাম্প নিয়ে কাজ করে এবং ৫ সেকেন্ডের out-of-order ইভেন্ট মেনে নেয়।Flink-এ watermark এর কয়েকটি ধরন রয়েছে:
Apache Flink-এ Watermark এবং Event Time Processing ব্যবহার করে real-time ডেটা প্রসেসিং আরও নির্ভুল এবং সময়ানুগ করা যায়। এটি real-world অ্যাপ্লিকেশনগুলোর জন্য একটি শক্তিশালী টুল, যা stream processing এবং latency-sensitive ডেটা এনালিটিক্স এর জন্য খুবই উপযোগী।
Read more